Source code for qtealeaves.tensors.tensor

# This code is part of qtealeaves.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""
Tensor class based on numpy / cupy.

Quick programming guide
-----------------------

* numpy versus cupy is resolved as `xp`, which is provide as `_device_checks` function.
  Function acting on the tensors, must use `xp`; calculations on the shape of a tensor
  can be done directly with numpy.
* Multi-GPU support: all functions executing calculations on the GPU must use the
  decorator `gpu_switch` using the corresponding context manager. The decorator does
  not ensure that all input is on the same device. (Easy first-order check: if
  `_device_check` is called, decorator is necessary for all non-static, non-classmethods.)
"""

# pylint: disable=too-many-lines
# pylint: disable=too-many-locals
# pylint: disable=too-many-statements
# pylint: disable=too-many-return-statements
# pylint: disable=too-many-arguments
# pylint: disable=I1101

import logging
import warnings
from contextlib import nullcontext
from math import ceil

import numpy as np
import scipy as sp
import scipy.linalg as sla
import scipy.sparse.linalg as ssla

# Try to import cupy
try:
    import cupy as cp
    import cupyx.scipy.linalg as cla
    import cupyx.scipy.sparse.linalg as csla
    from cupy_backends.cuda.api.runtime import CUDARuntimeError

    try:
        _ = cp.cuda.Device()
        GPU_AVAILABLE = True
        NUM_GPUS = cp.cuda.runtime.getDeviceCount()

        # for use in isinstance
        _ArrayTypes = (np.ndarray, cp.ndarray)

    except CUDARuntimeError:
        GPU_AVAILABLE = False
        NUM_GPUS = 0

        # for use in isinstance
        _ArrayTypes = np.ndarray

except ImportError:
    cp = None
    GPU_AVAILABLE = False
    NUM_GPUS = 0

    # for use in isinstance
    _ArrayTypes = np.ndarray

from qtealeaves.convergence_parameters import TNConvergenceParameters
from qtealeaves.solvers import EigenSolverH
from qtealeaves.tooling import QTeaLeavesError, read_tensor, write_tensor
from qtealeaves.tooling.devices import _CPU_DEVICE, _GPU_DEVICE, DeviceList
from qtealeaves.tooling.mpisupport import TN_MPI_TYPES

from .abstracttensor import (
    _AbstractDataMover,
    _AbstractQteaBaseTensor,
    _parse_block_size,
)

# pylint: disable-next=invalid-name
_BLOCK_SIZE_BOND_DIMENSION, _BLOCK_SIZE_BYTE = _parse_block_size()

# pylint: disable-next=invalid-name
_USE_STREAMS = False

__all__ = [
    "QteaTensor",
    "DataMoverNumpyCupy",
    "_process_svd_ctrl",
    "set_block_size_qteatensors",
]

logger = logging.getLogger(__name__)


# pylint: disable-next=dangerous-default-value
def logger_warning(*args, storage=[]):
    """Workaround to display warnings only once in logger."""
    if args in storage:
        return

    storage.append(args)
    logger.warning(*args)


# Define decorator allowing multi-GPU support
if NUM_GPUS > 1:

    def gpu_switch(func):
        """
        Used as decorator on functions that executes code on device
        allowing to use `cp.cuda.Device` context magaager.
        """

        def wrapper(*args, **kwargs):
            # As used on non-static methods only, first argument is self
            self = args[0]

            # pylint: disable-next=protected-access
            gpu_idx = self._gpu_idx(self.device)

            do_open_context = gpu_idx is not None
            current = cp.cuda.runtime.getDevice()
            if current == gpu_idx:
                # Current device is target, return function directly
                do_open_context = False

            if not do_open_context:
                return func(*args, **kwargs)

            with cp.cuda.Device(gpu_idx):
                return func(*args, **kwargs)

        return wrapper

else:
    # Either CPU or single-GPU, no wrapper needed
    def gpu_switch(func):
        """Empty decorator on functions that executes code on device."""
        return func


def set_block_size_qteatensors(block_size_bond_dimension=None, block_size_byte=None):
    """
    Allows to overwrite bond dimension decisions to enhance performance
    on hardware by keeping "better" or "consistent" bond dimensions.
    Only one of the two can be used right now.

    **Arguments**

    block_size_bond_dimension : int
        Direct handling of bond dimension.

    block_size_byte : int
        Control dimension of tensors (in SVD cuts) via blocks of bytes.
        For example, nvidia docs suggest multiples of sixteen float64
        or 32 float32 for A100, i.e., 128 bytes.

    **Details**

    There are two options of injecting hardware preferences for tensor.

    1) Call this function and it will set preferences for :class:`QteaTensors`
       via numpy/cupy
    2) Set the environment variables `QTEA_BLOCK_SIZE_BOND_DIMENSION` and/or
       `QTEA_BLOCK_SIZE_BYTE` which will be parsed by all :class:`_AbstractQteaBaseTensors`
       supporting this feature.

    Note that we do not support both options at the same time, only the block
    dimension in bytes will be used if both are available.
    """
    # pylint: disable-next=invalid-name,global-statement
    global _BLOCK_SIZE_BOND_DIMENSION
    # pylint: disable-next=invalid-name,global-statement
    global _BLOCK_SIZE_BYTE

    _BLOCK_SIZE_BOND_DIMENSION = block_size_bond_dimension
    _BLOCK_SIZE_BYTE = block_size_byte

    if (block_size_bond_dimension is not None) and (block_size_byte is not None):
        # We do not want to handle both of them, will be ignored later on,
        # but raise warning as early as possible
        warnings.warn("Ignoring BLOCK_SIZE_BOND_DIMENSION in favor of BLOCK_SIZE_BYTE.")


def set_streams_qteatensors(use_streams):
    """
    Allow to decide if streams are used.

    **Arguments**

    use_streams : bool
        If True, streams will be used, otherwise we return a nullcontext even
        if streams would be possible.

    """
    # pylint: disable-next=invalid-name,global-statement
    global _USE_STREAMS

    _USE_STREAMS = use_streams


# class set_block_size_qteatensors once to resolve if both variables
# are set
set_block_size_qteatensors(_BLOCK_SIZE_BOND_DIMENSION, _BLOCK_SIZE_BYTE)

_CPU_DEVICE = "cpu"
_GPU_DEVICE = "gpu"


# pylint: disable-next=too-many-public-methods
[docs] class QteaTensor(_AbstractQteaBaseTensor): """ Dense tensor for Quantum Tea simulations using numpy or cupy as underlying arrays and linear algebra. **Arguments** links : list of integers Dimension along each link. ctrl : str | scalar, optional Initialization of tensor. Valid are "N" (uninitialized array), "Z" (zeros) "R", "random" (random), "O" (ones), "1" or "eye" for rank-2 identity matrix, "ground" for first element equal to one, or `None` (elem completely not set), `scalar` (the tensor is filled with that scalar value, must pass np.isscalar and np.isreal checks). Default to "Z" are_links_outgoing : list of bools, optional Used in symmetric tensors only base_tensor_cls : valid dense quantum tea tensor or `None`, optional Used in symmetric tensors only dtype : data type, optional Data type for numpy or cupy. Default to np.complex128 device : device specification, optional Default to `"cpu"`. Available: `"cpu", "gpu"` **Details** Mixed-device mode The mixed-device mode is enabled for this class allowing for devices like `cpu+gpu:1`. The mixed-device mode can be used for :class:`QteaTensor`. Different GPUs can currently not be accessed from within a thread spanned inside simulations (other backends might support this). Tensor operation on the GPU have to be on the active cupy GPU, which we resolve by a decorator setting the active GPU in a context manager for every call. Unfortunately, not all operations can be executed on the selected device, i.e., if they happen outside the tensors module. Such operations are for example in the `qtealeaves.simulation` submodule; return value can be cupy-arrays with direct call of GPU kernels on the default GPU. In this case, a warning is displayed. """ implemented_devices = DeviceList([_CPU_DEVICE, _GPU_DEVICE]) def __init__( self, links, ctrl="Z", are_links_outgoing=None, # pylint: disable=unused-argument base_tensor_cls=None, # pylint: disable=unused-argument dtype=np.complex128, device=None, ): """ links : list of ints with shape (links works towards generalization) """ super().__init__(links) self._device = device xp = self._device_checks() with self._gpu_idx_context(device): if ctrl is None: self._elem = None elif ctrl in ["N"]: self._elem = xp.ndarray(links, dtype=dtype) elif ctrl in ["O"]: self._elem = xp.ones(links, dtype=dtype) elif ctrl in ["Z"]: self._elem = xp.zeros(links, dtype=dtype) elif ctrl in ["1", "eye"]: if len(links) != 2: raise ValueError("Initialization with identity only for rank-2.") if links[0] != links[1]: raise ValueError( "Initialization with identity only for square matrix." ) self._elem = xp.eye(links[0], dtype=dtype) elif (ctrl in ["R", "random"]) and (dtype in [xp.complex64, xp.complex128]): # Random numbers and complex self._elem = xp.random.rand(*links) + 1j * xp.random.rand(*links) self.convert(dtype, device) elif ctrl in ["R", "random"]: # Random and real numbers self._elem = xp.random.rand(*links) self.convert(dtype, device) elif ctrl in ["ground"]: dim = np.prod(links) self._elem = xp.zeros([dim], dtype=dtype) self._elem[0] = 1.0 self._elem = self._elem.reshape(links) elif np.isscalar(ctrl) and np.isreal(ctrl): dim = np.prod(links) # NOTE: xp.repeat breaks with python's float and cupy; is # there a better way to do this? self._elem = xp.ones(dim) * ctrl self._elem = self._elem.reshape(links).astype(dtype) else: raise QTeaLeavesError(f"Unknown initialization {ctrl}.") # -------------------------------------------------------------------------- # Properties # -------------------------------------------------------------------------- @property def are_links_outgoing(self): """Define property of outgoing links as property (always False).""" return [False] * self.ndim @property def device(self): """Device where the tensor is stored.""" return self._device @property def elem(self): """Elements of the tensor.""" return self._elem @property def dtype(self): """Data type of the underlying arrays.""" return self._elem.dtype @property def dtype_eps(self): """Data type's machine precision.""" eps_dict = { "float16": 1e-3, "float32": 1e-7, "float64": 1e-14, "complex64": 1e-7, "complex128": 1e-14, } return eps_dict[str(self.dtype)] @property def linear_algebra_library(self): """Specification of the linear algebra library used as string `numpy-cupy``.""" return "numpy-cupy" @property def links(self): """Here, as well dimension of tensor along each dimension.""" return self.shape @property def ndim(self): """Rank of the tensor.""" return self._elem.ndim @property def shape(self): """Dimension of tensor along each dimension.""" return self._elem.shape # -------------------------------------------------------------------------- # Overwritten operators # -------------------------------------------------------------------------- # # inherit def __eq__ # inherit def __ne__ @gpu_switch def __add__(self, other): """ Addition of a scalar to a tensor adds it to all the entries. If other is another tensor, elementwise addition if they have the same shape """ new_tensor = self.copy() if np.isscalar(other): new_tensor._elem += other elif isinstance(other, QteaTensor): new_tensor._elem += other.elem else: raise TypeError( "Addition for QteaTensor is defined only for scalars and QteaTensor," + f" not {type(other)}" ) return new_tensor @gpu_switch def __iadd__(self, other): """In-place addition of tensor with tensor or scalar (update).""" if np.isscalar(other): self._elem += other elif isinstance(other, QteaTensor): self._elem += other.elem else: raise TypeError( "Addition for QteaTensor is defined only for scalars and QteaTensor," + f" not {type(other)}" ) return self @gpu_switch def __mul__(self, scalar): """Multiplication of tensor with scalar returning new tensor as result.""" return QteaTensor.from_elem_array( scalar * self._elem, dtype=self.dtype, device=self.device ) @gpu_switch def __imul__(self, scalar): """In-place multiplication of tensor with scalar (update).""" self._elem *= scalar return self @gpu_switch def __itruediv__(self, scalar): """In-place division of tensor with scalar (update).""" if scalar == 0 or np.isinf(scalar) or np.isnan(scalar): raise QTeaLeavesError("Trying to divide by zero.") self._elem /= scalar return self @gpu_switch def __sub__(self, other): """ Subtraction of a scalar to a tensor subtracts it to all the entries. If other is another tensor, elementwise subtraction if they have the same shape """ new_tensor = self.copy() if np.isscalar(other): new_tensor._elem -= other elif isinstance(other, QteaTensor): new_tensor._elem -= other.elem else: raise TypeError( "Addition for QteaTensor is defined only for scalars and QteaTensor," + f" not {type(other)}" ) return new_tensor @gpu_switch def __truediv__(self, sc): """Division of tensor with scalar.""" if sc == 0: raise QTeaLeavesError("Trying to divide by zero.") elem = self._elem / sc return QteaTensor.from_elem_array(elem, dtype=self.dtype, device=self.device) @gpu_switch def __neg__(self): """Negative of a tensor returned as a new tensor.""" # pylint: disable-next=invalid-unary-operand-type neg_elem = -self._elem return QteaTensor.from_elem_array( neg_elem, dtype=self.dtype, device=self.device ) # -------------------------------------------------------------------------- # classmethod, classmethod like # --------------------------------------------------------------------------
[docs] @staticmethod def convert_operator_dict( op_dict, params=None, symmetries=None, generators=None, base_tensor_cls=None, dtype=np.complex128, device=_CPU_DEVICE, ): """ Iterate through an operator dict and convert the entries. Converts as well to rank-4 tensors. **Arguments** op_dict : instance of :class:`TNOperators` Contains the operators as xp.ndarray. params : dict, optional To resolve operators being passed as callable. symmetries: list, optional, for compatibility with symmetric tensors. Must be empty list. generators : list, optional, for compatibility with symmetric tensors. Must be empty list. base_tensor_cls : None, optional, for compatibility with symmetric tensors. No checks on this one here. dtype : data type for xp, optional Specify data type. Default to `np.complex128` device : str Device for the simulation. Available "cpu" and "gpu" Default to "cpu" **Details** The conversion to rank-4 tensors is useful for future implementations, either to support adding interactions with a bond dimension greater than one between them or for symmetries. We add dummy links of dimension one. The order is (dummy link to the left, old link-1, old link-2, dummy link to the right). """ if params is None: params = {} if symmetries is None: symmetries = [] if generators is None: generators = [] if len(symmetries) != 0: raise QTeaLeavesError("Symmetries not supported, but symmetry given.") if len(generators) != 0: raise QTeaLeavesError("Symmetries not supported, but generators given.") def transformation(key, value, op_dict=op_dict, params=params): if isinstance(value, QteaTensor): tensor = value else: tensor = op_dict.get_operator(*key, params) tensor = QteaTensor.from_elem_array(tensor, dtype=dtype, device=device) if tensor.ndim == 2: tensor.attach_dummy_link(0) tensor.attach_dummy_link(3) return tensor new_op_dict = op_dict.transform(transformation) return new_op_dict
[docs] @gpu_switch def copy(self, dtype=None, device=None): """Make a copy of a tensor.""" if dtype is None: dtype = self.dtype if device is None: device = self.device tensor = self.from_elem_array(self._elem.copy(), dtype=dtype, device=device) return tensor
[docs] @gpu_switch def eye_like(self, link): """ Generate identity matrix. **Arguments** self : instance of :class:`QteaTensor` Extract data type etc from this one here. link : same as returned by `links` property, here integer. Dimension of the square, identity matrix. """ xp = self._device_checks() elem = xp.eye(link) return self.from_elem_array(elem, dtype=self.dtype, device=self.device)
[docs] @gpu_switch def random_unitary(self, links): """ Generate a random unitary matrix via performing a QR on a random tensor. **Arguments** self : instance of :class:`QteaTensor` Extract data type etc from this one here. links : same as returned by `links` property, here integer. Dimension of the tensors as [link[0], .., link[-1], link[0], .., link[-1]], random unitary matrix for contracting first/last half of legs with itself. """ xp = self._device_checks() dim = np.prod(links) elem = xp.random.rand(dim, dim) elem = xp.linalg.qr(elem)[0].reshape(2 * links) return self.from_elem_array(elem, dtype=self.dtype, device=self.device)
# no gpu_switch necessary, will be handled inside convert / from_elem_array
[docs] @classmethod def read(cls, filehandle, dtype, device, base_tensor_cls, cmplx=True, order="F"): """Read a tensor from file.""" elem = read_tensor(filehandle, cmplx=cmplx, order=order) return cls.from_elem_array(elem, dtype=dtype, device=device)
# -------------------------------------------------------------------------- # Checks and asserts # -------------------------------------------------------------------------- # # inherit def assert_normalized # inherit def assert_unitary # inherit def sanity_check
[docs] @gpu_switch def are_equal(self, other, tol=1e-7): """Check if two tensors are equal.""" xp = self._device_checks() if self.ndim != other.ndim: return False if np.any(self.shape != other.shape): return False return xp.isclose(self._elem, other.elem, atol=tol).all()
[docs] @gpu_switch def assert_identity(self, tol=1e-7): """Check if tensor is an identity matrix.""" if not self.is_close_identity(tol=tol): raise QTeaLeavesError("Tensor not diagonal with ones.", self._elem)
[docs] @gpu_switch def is_close_identity(self, tol=1e-7): """Check if rank-2 tensor is close to identity.""" xp = self._device_checks() if self.ndim != 2: return False if self.shape[0] != self.shape[1]: return False eye = xp.eye(self.shape[0]) eps = (np.abs(eye - self._elem)).max() return eps < tol
# Overwrite implementation going via char
[docs] def is_dtype_complex(self): """Check if data type is complex.""" xp = self._device_checks() return xp.issubdtype(self._elem.dtype, xp.complexfloating)
[docs] def is_implemented_device(self, query): """ Check if argument query is an implemented device. Parameters ---------- query : str String to be tested if it corresponds to a device implemented with this tensor. Returns ------- is_implemented : bool True if string is available as device. """ return query in self.implemented_devices
# -------------------------------------------------------------------------- # Single-tensor operations # -------------------------------------------------------------------------- # # inherit def flip_links_update
[docs] @gpu_switch def conj(self): """Return the complex conjugated in a new tensor.""" return QteaTensor.from_elem_array( self._elem.conj(), dtype=self.dtype, device=self.device )
[docs] @gpu_switch def conj_update(self): """Apply the complex conjugated to the tensor in place.""" xp = self._device_checks() xp.conj(self._elem, out=self._elem)
# pylint: disable-next=too-many-branches
[docs] def convert(self, dtype=None, device=None, stream=None): """ Convert underlying array to the specified data type and device inplace. Parameters ---------- dtype : np.dtype, optional Type to which you want to convert. If None, no conversion. Default to None. device : str, optional Device where you want to send the QteaTensor. If None, no conversion. Default to None. stream : None | bool | cp.cuda.Stream | etc If None, use default stream for memory communication. If boolean, new stream is used for `True`. If not None and bool, use a new stream for memory communication. Default to None (Use null stream). """ # To simplify syntax below, we use nullcontext if stream is # not given scontext = nullcontext() if stream is None else stream if isinstance(scontext, bool): # Some calling methods pass boolean, catch here and generate # stream on the fly blocking = not scontext # cupy's asnumpy() if will not allow stream to be bool stream = self.stream() if stream else None scontext = self.stream() if scontext else nullcontext() else: blocking = stream is None if device is not None: if not self.is_implemented_device(device): raise ValueError( f"Device {device} is not implemented. Select from" + f" {self.implemented_devices}" ) if self.is_gpu(query=device) and (not GPU_AVAILABLE): raise ImportError("CUDA GPU is not available") # Both devices available, figure out what we currently have # and start converting if isinstance(self._elem, np.ndarray): current = _CPU_DEVICE elif cp is None: current = None elif isinstance(self.elem, cp.ndarray): current = _GPU_DEVICE else: current = None if device == current: # We already are in the correct device pass elif self.is_gpu() and self.is_gpu(query=device): # According to cupy docs, GPU to GPU copy is performed via cp.asarray # https://docs.cupy.dev/en/stable/user_guide/basic.html#data-transfer # We do not open a stream here, not sure which device would be the # correct one to open a stream with self._gpu_idx_context(device): self._elem = cp.asarray(self._elem) elif self.is_gpu(query=device): with self._gpu_idx_context(device): # We run a certain risk here that the stream is not on the # correct device with scontext: # We go from cpu to gpu self._elem = cp.asarray(self._elem) elif self.is_cpu(query=device): # We run a certain risk here that the stream is not on the # correct device; asnumpy supports passing a stream # We go from gpu to cpu if isinstance(stream, nullcontext): stream = None else: # we need to wait for any computation on the default stream to finish # before we can safely copy data to the cpu cp.cuda.Stream.null.synchronize() self._elem = cp.asnumpy(self._elem, stream=stream, blocking=blocking) self._device = device if dtype is not None: if dtype != self.dtype: self._elem = self._elem.astype(dtype) return self
# pylint: disable-next=too-many-branches
[docs] def convert_singvals(self, singvals, dtype=None, device=None, stream=None): """ Convert the singular values via a tensor. Parameters ---------- dtype : np.dtype, optional Type to which you want to convert. If None, no conversion. Default to None. device : str, optional Device where you want to send the QteaTensor. If None, no conversion. Default to None. stream : None | cp.cuda.Stream If not None, use a new stream for memory communication. Default to None (Use null stream). """ xp = self._device_checks() # To simplify syntax below, we use nullcontext if stream is # not given scontext = nullcontext() if stream is None else stream blocking = stream is None if device is not None: if not self.is_implemented_device(device): raise ValueError( f"Device {device} is not implemented. Select from" + f" {self.implemented_devices}" ) if self.is_gpu(query=device) and (not GPU_AVAILABLE): raise ImportError("CUDA GPU is not available") # Both devices available, figure out what we currently have # and start converting if isinstance(singvals, np.ndarray): current = _CPU_DEVICE elif cp is None: current = None elif isinstance(singvals, cp.ndarray): current = _GPU_DEVICE else: current = None if device == current: # We already are in the correct device pass elif self.is_gpu() and self.is_gpu(query=device): # According to cupy docs, GPU to GPU copy is performed via cp.asarray # https://docs.cupy.dev/en/stable/user_guide/basic.html#data-transfer # We do not open a stream here, not sure which device would be the # correct one to open a stream with self._gpu_idx_context(device): singvals = cp.asarray(singvals) elif self.is_gpu(query=device): with self._gpu_idx_context(device): # We run a certain risk here that the stream is not on the # correct device with scontext: # We go from cpu to gpu singvals = cp.asarray(singvals) elif self.is_cpu(query=device): # We run a certain risk here that the stream is not on the # correct device; asnumpy supports passing a stream # We go from gpu to cpu singvals = cp.asnumpy(singvals, stream=stream, blocking=blocking) if dtype is not None: dtype = dtype(0).real.dtype if dtype != singvals.dtype: if xp.max(xp.abs(xp.imag(singvals))) > 10 * self.dtype_eps: raise ValueError("Singular values have imaginary part.") singvals = xp.real(singvals).astype(dtype) return singvals
[docs] @gpu_switch def diag(self, real_part_only=False, do_get=False): """ Return either the diagonal of an input rank-2 tensor or a rank-2 tensor of an input diagonal. """ xp = self._device_checks() if self.ndim not in [1, 2]: raise QTeaLeavesError("Can only run on rank-1 or rank-2.") diag = xp.diag(self._elem) if real_part_only: diag = xp.real(diag) if self.is_gpu() and do_get: diag = diag.get() return diag
# no gpu_switch necessary, only access to data types
[docs] def dtype_from_char(self, dtype): """Resolve data type from chars C, D, S, Z and optionally H.""" xp = self._device_checks() data_types = { "A": xp.complex128, "C": xp.complex64, "D": xp.float64, "H": xp.float16, "S": xp.float32, "Z": xp.complex128, "I": xp.int32, } return data_types[dtype]
# no gpu_switch necessary, only access to eigensolver function
[docs] def eig_api( self, matvec_func, links, conv_params, args_func=None, kwargs_func=None ): """ Interface to hermitian eigenproblem **Arguments** matvec_func : callable Multiplies "matrix" with "vector" links : links according to :class:`QteaTensor` Contain the dimension of the problem. conv_params : instance of :class:`TNConvergenceParameters` Settings for eigenproblem with Arnoldi method. args_func : arguments for matvec_func kwargs_func : keyword arguments for matvec_func **Returns** eigenvalues : scalar eigenvectors : instance of :class:`QteaTensor` """ _, xsla = self._device_checks(return_sla=True) try: # pylint: disable-next=invalid-name ArpackNoConvergence = xsla.ArpackNoConvergence msg_error = ( "Arpack not converging with new tolerance. Using qtealeaves solver." ) except AttributeError: # The definition of the error seems to move every second version of # numpy and cupy # pylint: disable-next=invalid-name ArpackNoConvergence = QTeaLeavesError msg_error = ( "Arpack failed (catching any exception). Using qtealeaves solver." ) kwargs, _, _ = self.prepare_eig_api(conv_params) use_qtea_solver = kwargs.pop("use_qtea_solver", False) # qtea_solver is fallback solution for Arpack, so try Arpack first if not use_qtea_solver: try: val, vec = self.eig_api_arpack( matvec_func, links, conv_params, args_func=args_func, kwargs_func=kwargs_func, ) return val, vec except ArpackNoConvergence: # We iterated already over both tolerances, now try qtea_solver use_qtea_solver = True logger.warning(msg_error) # Now, qtea_solver must be either requested or be the fallback val, vec = self.eig_api_qtea( matvec_func, conv_params, args_func=args_func, kwargs_func=kwargs_func, ) # Half precision had problems with normalization (most likely # as eigh is executed on higher precision. Insert `vec /= vec.norm_sqrt()` # again if necessary, but general normalization was covering other errors. return val, vec
# no gpu_switch necessary, only resolving abs function
[docs] def eig_api_qtea(self, matvec_func, conv_params, args_func=None, kwargs_func=None): """ Interface to hermitian eigenproblem via qtealeaves.solvers. Arguments see `eig_api`. """ xp = self._device_checks() injected_funcs = { "abs": xp.abs, } solver = EigenSolverH( self, matvec_func, conv_params, args_func=args_func, kwargs_func=kwargs_func, injected_funcs=injected_funcs, ) res = solver.solve() # Free the allocated device memory that is no longer used (only # on the device of this tensor) self.free_device_memory(device=self.device) return res
[docs] @gpu_switch def eig_api_arpack( self, matvec_func, links, conv_params, args_func=None, kwargs_func=None ): """ Interface to hermitian eigenproblem via Arpack. Arguments see `eig_api`. """ if args_func is None: args_func = [] if kwargs_func is None: kwargs_func = {} xp, xsla = self._device_checks(return_sla=True) kwargs, linear_operator, eigsh = self.prepare_eig_api(conv_params) _ = kwargs.pop("injected_funcs") _ = kwargs.pop("injected_funcs", None) _ = kwargs.pop("use_qtea_solver", None) def my_matvec( vec, func=matvec_func, this=self, links=links, xp=xp, args=args_func, kwargs=kwargs_func, ): if isinstance(vec, xp.ndarray): tens = this.from_elem_array(vec, dtype=self.dtype, device=self.device) tens.reshape_update(links) else: raise QTeaLeavesError("unknown type") tens = -func(tens, *args, **kwargs) return tens.elem.reshape(-1) ham_dim = int(np.prod(links)) lin_op = linear_operator((ham_dim, ham_dim), matvec=my_matvec) if "v0" in kwargs: kwargs["v0"] = self._elem.reshape(-1) # Even if this happens we are fine: our approach is iterative, at the next # sweep the value should converge try: eigenvalues, eigenvectors = eigsh(lin_op, **kwargs) except xsla.ArpackNoConvergence: kwargs["tol"] = conv_params.sim_params["arnoldi_max_tolerance"] logger.warning( "Arpack solver did not converge. Increasing tolerance to %s.", kwargs["tol"], ) # try-except logic is replicated on calling function, we will not try # another Arpack call here; kwargs is local, so no need to reset original # tolerance eigenvalues, eigenvectors = eigsh(lin_op, **kwargs) # Free the allocated device memory that is no longer used self.free_device_memory(device=self.device) return ( -eigenvalues, self.from_elem_array( eigenvectors.reshape(links), dtype=self.dtype, device=self.device ), )
[docs] @gpu_switch def einsum(self, einsum_str, *others): """ Call to einsum with `self` as first tensor. Arguments --------- einsum_str : str Einsum contraction rule. other: List[:class:`QteaTensors`] 2nd, 3rd, ..., n-th tensor in einsum rule as positional arguments. Results ------- tensor : :class:`QteaTensor` Contracted tensor according to the einsum rules. Details ------- The call ``np.einsum(einsum_str, x.elem, y.elem, z.elem)`` translates into ``x.einsum(einsum_str, y, z)`` for x, y, and z being :class:`QteaTensor`. """ xp = self._device_checks() # List of :class:`AbstractQteaTensors tensors = [self] + list(others) # Check for optimization level, do an educated guess here optimization_level = self.einsum_optimization_level(tensors, einsum_str) optimize = { 0: False, 1: True, 2: "optimal", }[optimization_level] # Convert to actual data type of backend # pylint: disable-next=no-member tensors = [tensor.elem for tensor in tensors] elem = xp.einsum(einsum_str, *tensors, optimize=optimize) return self.from_elem_array(elem, dtype=self.dtype, device=self.device)
# pylint: disable-next=unused-argument
[docs] @gpu_switch def getsizeof(self): """Size in memory (approximate, e.g., without considering meta data).""" # Enable fast switch, previously use sys.getsizeof had trouble # in resolving size, numpy attribute is only for array without # metadata, but metadata like dimensions is only small overhead. # (fast switch if we want to use another approach for estimating # the size of a numpy array) return self._elem.nbytes
[docs] @gpu_switch def get_entry(self): """Get entry if scalar on host.""" if np.prod(self.shape) != 1: raise QTeaLeavesError("Cannot get entry, more than one.") if self.is_gpu(): return self._elem.get().reshape(-1)[0] return self._elem.reshape(-1)[0]
[docs] @gpu_switch def kron(self, other, idxs=None): """ Perform the kronecker product between two tensors. By default, do it over all the legs, but you can also specify which legs should be kroned over. The legs over which the kron is not done should have the same dimension. Parameters ---------- other : QteaTensor Tensor to kron with self idxs : Tuple[int], optional Indexes over which to perform the kron. If None, kron over all indeces. Default to None. Returns ------- QteaTensor The kronned tensor Details ------- Performing the kronecker product between a tensor of shape (2, 3, 4) and a tensor of shape (1, 2, 3) will result in a tensor of shape (2, 6, 12). To perform the normal kronecker product between matrices just pass rank-2 tensors. To perform kronecker product between vectors first transfor them in rank-2 tensors of shape (1, -1) Performing the kronecker product only along **some** legs means that along that leg it is an elementwise product and not a kronecker. For Example, if idxs=(0, 2) for the tensors of shapes (2, 3, 4) and (1, 3, 2) the output will be of shape (2, 3, 8). """ xp = self._device_checks() if isinstance(other, xp.ndarray): other = QteaTensor.from_elem_array( other, dtype=self.dtype, device=self.device ) warnings.warn("Converting tensor on the fly.") subscipts, final_shape = self._einsum_for_kron(self.shape, other.shape, idxs) elem = xp.einsum(subscipts, self._elem, other._elem).reshape(final_shape) tens = QteaTensor.from_elem_array(elem, dtype=self.dtype, device=self.device) return tens
[docs] @classmethod def mpi_bcast(cls, tensor, comm, tensor_backend, root=0): """ Broadcast tensor via MPI. """ dtype = tensor_backend.dtype is_root = comm.Get_rank() == root # Broadcast the dim of the shape dim = tensor.ndim if is_root else 0 dim = comm.bcast(dim, root=root) # Broadcast shape shape = ( np.array(list(tensor.shape), dtype=int) if is_root else np.zeros(dim, dtype=int) ) comm.Bcast([shape, TN_MPI_TYPES["<i8"]], root=root) # Broadcast the tensor if not is_root: obj = cls(shape, ctrl="N", dtype=dtype, device=_CPU_DEVICE) # bcast will write in every process which is not root, so access _elem # pylint: disable-next=protected-access elem = obj._elem elif tensor.is_cpu(): obj = tensor elem = tensor.elem else: obj = tensor elem = obj.elem.get() dtype_mpi = obj.dtype_mpi() comm.Bcast([elem, dtype_mpi], root=root) obj.convert(device=tensor_backend.device) return obj
[docs] def mpi_send(self, to_, comm): """ Send tensor via MPI. **Arguments** to : integer MPI process to send tensor to. comm : instance of MPI communicator to be used """ # Send the dim of the shape comm.send(self.ndim, to_) # Send the shape first shape = np.array(list(self.shape), dtype=int) comm.Send([shape, TN_MPI_TYPES["<i8"]], to_) # Send the tensor if hasattr(self._elem, "get"): elem = self._elem.get() else: elem = self._elem dtype_mpi = self.dtype_mpi() comm.Send([np.ascontiguousarray(elem), dtype_mpi], to_)
[docs] @classmethod def mpi_recv(cls, from_, comm, tensor_backend): """ Send tensor via MPI. **Arguments** from_ : integer MPI process to receive tensor from. comm : instance of MPI communicator to be used tensor_backend : instance of :class:`TensorBackend` """ # Receive the number of legs ndim = comm.recv(source=from_) # Receive the shape shape = np.empty(ndim, dtype=int) comm.Recv([shape, TN_MPI_TYPES["<i8"]], from_) dtype = tensor_backend.dtype obj = cls(shape, ctrl="N", dtype=dtype, device=_CPU_DEVICE) # Receive the tensor comm.Recv([obj._elem, obj.dtype_mpi()], from_) obj.convert(dtype=dtype, device=tensor_backend.memory_device) return obj
[docs] @gpu_switch def norm(self): """Calculate the norm of the tensor <tensor|tensor>.""" xp = self._device_checks() cidxs = np.arange(self.ndim) return xp.real(xp.tensordot(self._elem, self._elem.conj(), (cidxs, cidxs)))
[docs] @gpu_switch def norm_sqrt(self): """ Calculate the square root of the norm of the tensor, i.e., sqrt( <tensor|tensor>). """ xp = self._device_checks() norm = self.norm() return xp.sqrt(norm)
[docs] @gpu_switch def normalize(self): """Normalize tensor with sqrt(<tensor|tensor>).""" self._elem /= self.norm_sqrt()
[docs] @gpu_switch def set_diagonal_entry(self, position, value): """Set the diagonal element in a rank-2 tensor (inplace update)""" if self.ndim != 2: raise QTeaLeavesError("Can only run on rank-2 tensor.") self._elem[position, position] = value
[docs] @gpu_switch def set_matrix_entry(self, idx_row, idx_col, value): """Set one element in a rank-2 tensor (inplace update)""" if self.ndim != 2: raise QTeaLeavesError("Can only run on rank-2 tensor.") self._elem[idx_row, idx_col] = value
[docs] @staticmethod def set_seed(seed, devices=None): """ Set the seed for this tensor backend and the specified devices. Arguments --------- seed : list[int] List of integers used as a seed; list has length 4. devices : list[str] | None, optional Can pass a list of devices via a string, e.g., to specify GPU by index. Default to `None` (CPU seed set, default GPU set if GPU available) """ # Cover all CPU cases np.random.seed(seed) if not GPU_AVAILABLE: return # Find single integer as seed elegant_pairing = lambda nn, mm: nn**2 + nn + mm if nn >= mm else mm * 2 + nn intermediate_a = elegant_pairing(seed[0], seed[1]) intermediate_b = elegant_pairing(seed[2], seed[3]) single_seed = elegant_pairing(intermediate_a, intermediate_b) if devices is None: # GPU available, but not specified via index cp.random.seed(single_seed) return # We have a device list devices_set = [] for device in devices: if not QteaTensor.is_gpu_static(device): continue if ":" in device: device_idx = int(device.split(":")[1]) else: device_idx = 0 if device_idx in devices_set: continue with cp.cuda.Device(device_idx): cp.random.seed(single_seed) devices_set.append(device_idx)
[docs] def to_dense(self, true_copy=False): """Return dense tensor (if `true_copy=False`, same object may be returned).""" if true_copy: return self.copy() return self
[docs] def to_dense_singvals(self, s_vals, true_copy=False): """Convert singular values to dense vector without symmetries.""" if true_copy: return s_vals.copy() return s_vals
[docs] @gpu_switch def trace(self, return_real_part=False, do_get=False): """Take the trace of a rank-2 tensor.""" xp = self._device_checks() if self.ndim != 2: raise QTeaLeavesError("Can only run on rank-2 tensor.") value = xp.trace(self._elem) if return_real_part: value = xp.real(value) if self.is_gpu() and do_get: value = value.get() return value
[docs] @gpu_switch def transpose(self, permutation): """Permute the links of the tensor and return new tensor.""" tens = QteaTensor(None, ctrl=None, dtype=self.dtype, device=self.device) # pylint: disable-next=protected-access tens._elem = self._elem.transpose(permutation) return tens
[docs] @gpu_switch def transpose_update(self, permutation): """Permute the links of the tensor inplace.""" self._elem = self._elem.transpose(permutation)
[docs] @gpu_switch def write(self, filehandle, cmplx=None): """ Write tensor in original Fortran compatible way. **Details** 1) Number of links 2) Line with link dimensions 3) Entries of tensors line-by-line in column-major ordering. """ xp = self._device_checks() if cmplx is None: cmplx = xp.sum(xp.abs(xp.imag(self.elem))) > 1e-15 write_tensor(self.elem, filehandle, cmplx=cmplx)
# -------------------------------------------------------------------------- # Two-tensor operations # --------------------------------------------------------------------------
[docs] @gpu_switch def add_update(self, other, factor_this=None, factor_other=None): """ Inplace addition as `self = factor_this * self + factor_other * other`. **Arguments** other : same instance as `self` Will be added to `self`. Unmodified on exit. factor_this : scalar Scalar weight for tensor `self`. factor_other : scalar Scalar weight for tensor `other` """ if (factor_this is None) and (factor_other is None): self._elem += other.elem return if factor_this is not None: self._elem *= factor_this if factor_other is None: self._elem += other.elem return self._elem += factor_other * other.elem
[docs] @gpu_switch def dot(self, other): """Inner product of two tensors <self|other>.""" xp = self._device_checks() return xp.vdot(self._elem.reshape(-1), other.elem.reshape(-1))
[docs] @gpu_switch def split_qr( self, legs_left, legs_right, perm_left=None, perm_right=None, is_q_link_outgoing=True, # pylint: disable=unused-argument disable_streams=False, # pylint: disable=unused-argument ): """ Split the tensor via a QR decomposition. Parameters ---------- self : instance of :class:`QteaTensor` Tensor upon which apply the QR legs_left : list of int Legs that will compose the rows of the matrix legs_right : list of int Legs that will compose the columns of the matrix perm_left : list of int, optional permutations of legs after the QR on left tensor perm_right : list of int, optional permutation of legs after the QR on right tensor disable_streams : boolean, optional No effect here, but in general can disable streams to avoid nested generation of streams. Returns ------- tens_left: instance of :class:`QteaTensor` unitary tensor after the QR, i.e., Q. tens_right: instance of :class:`QteaTensor` upper triangular tensor after the QR, i.e., R """ xp = self._device_checks() is_good_bipartition, is_sorted_l, is_sorted_r = self._split_checks_links( legs_left, legs_right ) if is_good_bipartition and is_sorted_l and is_sorted_r: d1 = np.prod(np.array(self.shape)[legs_left]) d2 = np.prod(np.array(self.shape)[legs_right]) tens_left, tens_right = self._split_qr_dim(d1, d2) k_dim = tens_right.shape[0] tens_left.reshape_update(list(np.array(self.shape)[legs_left]) + [k_dim]) tens_right.reshape_update([k_dim] + list(np.array(self.shape)[legs_right])) else: # Reshaping matrix = self._elem.transpose(legs_left + legs_right) shape_left = np.array(self.shape)[legs_left] shape_right = np.array(self.shape)[legs_right] matrix = matrix.reshape(np.prod(shape_left), np.prod(shape_right)) k_dim = np.min([matrix.shape[0], matrix.shape[1]]) if self.dtype == xp.float16: matrix = matrix.astype(xp.float32) # QR decomposition mat_left, mat_right = xp.linalg.qr(matrix) if self.dtype == xp.float16: mat_left = mat_left.astype(xp.float16) mat_right = mat_right.astype(xp.float16) # Reshape back to tensors tens_left = QteaTensor.from_elem_array( mat_left.reshape(list(shape_left) + [k_dim]), dtype=self.dtype, device=self.device, ) tens_right = QteaTensor.from_elem_array( mat_right.reshape([k_dim] + list(shape_right)), dtype=self.dtype, device=self.device, ) if perm_left is not None: tens_left.transpose_update(perm_left) if perm_right is not None: tens_right.transpose_update(perm_right) return tens_left, tens_right
[docs] @gpu_switch def split_qrte( self, tens_right, singvals_self, operator=None, conv_params=None, is_q_link_outgoing=True, # pylint: disable=unused-argument ): """ Perform an Truncated ExpandedQR decomposition, generalizing the idea of https://arxiv.org/pdf/2212.09782.pdf for a general bond expansion given the isometry center of the network on `tens_left`. It should be rather general for three-legs tensors, and thus applicable with any tensor network ansatz. Notice that, however, you do not have full control on the approximation, since you know only a subset of the singular values truncated. Parameters ---------- tens_left: xp.array Left tensor tens_right: xp.array Right tensor singvals_left: xp.array Singular values array insisting on the link to the left of `tens_left` operator: xp.array or None Operator to contract with the tensors. If None, no operator is contracted Returns ------- tens_left: ndarray left tensor after the EQR tens_right: ndarray right tensor after the EQR singvals: ndarray singular values kept after the EQR singvals_cutted: ndarray subset of thesingular values cutted after the EQR, normalized with the biggest singval """ xp = self._device_checks() if conv_params is None: conv_params = TNConvergenceParameters() logger_warning("Using default convergence parameters (QRTE).") elif not isinstance(conv_params, TNConvergenceParameters): raise ValueError( "conv_params must be TNConvergenceParameters or None, " + f"not {type(conv_params)}." ) # Trial bond dimension eta = ceil((1 + conv_params.min_expansion_qr) * self.shape[0]) # Contract the two tensors together twotensors = xp.tensordot(self._elem, tens_right.elem, (2, 0)) twotensors = xp.tensordot(xp.diag(singvals_self), twotensors, (1, 0)) # Contract with the operator if present if operator is not None: twotensors = xp.tensordot(twotensors, operator.elem, ([1, 2], [2, 3])) # For simplicity, transpose in the same order as obtained # after the application of the operator else: twotensors = twotensors.transpose(0, 3, 1, 2) # Apply first phase in expanding the bond dimension expansor = xp.eye(eta, np.prod(self.shape[:2])).reshape(eta, *self.shape[:2]) expanded_y0 = xp.tensordot(expansor, twotensors, ([1, 2], [0, 2])) expanded_y0 = expanded_y0.transpose([0, 2, 1]) # Contract with the (i+1)th site dagger first_qr = xp.tensordot(twotensors, expanded_y0.conj(), ([1, 3], [2, 1])) first_q, _ = xp.linalg.qr(first_qr.reshape(-1, first_qr.shape[2])) first_q = first_q.reshape(first_qr.shape) # Contract the new q with the i-th site. The we would need a rq decomposition. second_qr = xp.tensordot(twotensors, first_q.conj(), ([0, 2], [0, 1])) second_qr = second_qr.transpose(2, 1, 0) second_q, second_r = xp.linalg.qr(second_qr.reshape(second_qr.shape[0], -1).T) second_q = second_q.T.reshape(second_qr.shape) # To get the real R matrix I would have to transpose, but to avoid a double # transposition I simply avoid that # second_r = second_r.T # Second phase in the expansor eigvl, eigvc = xp.linalg.eigh(second_r.conj() @ second_r.T) # Singvals are sqrt of eigenvalues, and sorted in the opposite order singvals = xp.sqrt(eigvl)[::-1] # Routine to select the bond dimension cut, singvals, singvals_cutted = self._truncate_singvals(singvals) tens_right = xp.tensordot(eigvc[:cut, ::-1], second_q, ([1], [0])) # Get the last tensor tens_left = xp.tensordot(twotensors, tens_right.conj(), ([1, 3], [2, 1])) tens_left = self.from_elem_array( tens_left, dtype=self.dtype, device=self.device ) tens_right = self.from_elem_array( tens_right, dtype=self.dtype, device=self.device ) return tens_left, tens_right, singvals, singvals_cutted
# no gpu_switch necessary, see quick return via super()_split_rq using QR
[docs] def split_rq( self, legs_left, legs_right, perm_left=None, perm_right=None, is_q_link_outgoing=True, # pylint: disable=unused-argument disable_streams=False, # pylint: disable=unused-argument ): """ Split the tensor via a RQ decomposition. The abstract class defines the RQ via a QR and permutation of legs, but we highly recommend overwriting this approach with an actual RQ. Parameters ---------- self : instance of :class:`_AbstractQteaTensor` Tensor upon which apply the RQ legs_left : list of int Legs that will compose the rows of the matrix (and the R matrix) legs_right : list of int Legs that will compose the columns of the matrix (and the Q matrix) perm_left : list of int | None, optional permutations of legs after the QR on left tensor Default to `None` (no permutation) perm_right : list of int | None, optional permutation of legs after the QR on right tensor Default to `None` (no permutation) is_q_link_outgoing : int, optional Direction of link, placeholder for symmetric tensors. Default to True. disable_streams : boolean, optional No effect here, but in general can disable streams to avoid nested generation of streams. Returns ------- tens_left: instance of :class:`_AbstractQteaTensor` upper triangular tensor after the RQ, i.e., R tens_right: instance of :class:`_AbstractQteaTensor` unitary tensor after the RQ, i.e., Q. """ xp = self._device_checks() # Bug in RQ via scipy although QR via scipy seems to work. # pylint: disable-next=using-constant-test if True: # if xp != np: # cupy has no RQ # return super().split_rq(...) return super().split_rq( legs_left, legs_right, perm_left=perm_left, perm_right=perm_right, is_q_link_outgoing=is_q_link_outgoing, ) is_good_bipartition, is_sorted_l, is_sorted_r = self._split_checks_links( legs_left, legs_right ) if is_good_bipartition and is_sorted_l and is_sorted_r: d1 = np.prod(np.array(self.shape)[legs_left]) d2 = np.prod(np.array(self.shape)[legs_right]) tens_left, tens_right = self._split_rq_dim(d1, d2) k_dim = tens_right.shape[0] tens_left.reshape_update(list(np.array(self.shape)[legs_left]) + [k_dim]) tens_right.reshape_update([k_dim] + list(np.array(self.shape)[legs_right])) else: # Reshaping matrix = self._elem.transpose(legs_left + legs_right) shape_left = np.array(self.shape)[legs_left] shape_right = np.array(self.shape)[legs_right] matrix = matrix.reshape(np.prod(shape_left), np.prod(shape_right)) k_dim = np.min([matrix.shape[0], matrix.shape[1]]) if self.dtype == xp.float16: matrix = matrix.astype(xp.float32) # QR decomposition mat_left, mat_right = sla.rq(matrix, mode="economic", check_finite=True) if self.dtype == xp.float16: mat_left = mat_left.astype(xp.float16) mat_right = mat_right.astype(xp.float16) # Reshape back to tensors tens_left = QteaTensor.from_elem_array( mat_left.reshape(list(shape_left) + [k_dim]), dtype=self.dtype, device=self.device, ) tens_right = QteaTensor.from_elem_array( mat_right.reshape([k_dim] + list(shape_right)), dtype=self.dtype, device=self.device, ) if perm_left is not None: tens_left.transpose_update(perm_left) if perm_right is not None: tens_right.transpose_update(perm_right) return tens_left, tens_right
[docs] @gpu_switch # pylint: disable-next=unused-argument, too-many-branches def split_svd( self, legs_left, legs_right, perm_left=None, perm_right=None, contract_singvals="N", conv_params=None, no_truncation=False, is_link_outgoing_left=True, disable_streams=False, ): """ Perform a truncated Singular Value Decomposition by first reshaping the tensor into a legs_left x legs_right matrix, and permuting the legs of the ouput tensors if needed. If the contract_singvals = ('L', 'R') it takes care of renormalizing the output tensors such that the norm of the MPS remains 1 even after a truncation. Parameters ---------- self : instance of :class:`QteaTensor` Tensor upon which apply the SVD legs_left : list of int Legs that will compose the rows of the matrix legs_right : list of int Legs that will compose the columns of the matrix perm_left : list of int, optional permutations of legs after the SVD on left tensor perm_right : list of int, optional permutation of legs after the SVD on right tensor contract_singvals: string, optional How to contract the singular values. 'N' : no contraction 'L' : to the left tensor 'R' : to the right tensor conv_params : :py:class:`TNConvergenceParameters`, optional Convergence parameters to use in the procedure. If None is given, then use the default convergence parameters of the TN. Default to None. no_truncation : boolean, optional Allow to run without truncation Default to `False` (hence truncating by default) disable_streams : boolean, optional No effect here, but in general can disable streams to avoid nested generation of streams. Returns ------- tens_left: instance of :class:`QteaTensor` left tensor after the SVD tens_right: instance of :class:`QteaTensor` right tensor after the SVD singvals: xp.ndarray singular values kept after the SVD singvals_cut: xp.ndarray singular values cut after the SVD, normalized with the biggest singval """ xp = self._device_checks() tensor = self._elem # Reshaping matrix = tensor.transpose(legs_left + legs_right) shape_left = np.array(tensor.shape)[legs_left] shape_right = np.array(tensor.shape)[legs_right] matrix = matrix.reshape([np.prod(shape_left), np.prod(shape_right)]) if conv_params is None: svd_ctrl = "A" max_bond_dimension = min(matrix.shape) else: svd_ctrl = conv_params.svd_ctrl max_bond_dimension = conv_params.max_bond_dimension svd_ctrl = _process_svd_ctrl( svd_ctrl, max_bond_dimension, matrix.shape, self.device, contract_singvals, ) if matrix.dtype == xp.float16: matrix = matrix.astype(xp.float32) # SVD decomposition if svd_ctrl in ("D", "V"): mat_left, singvals_tot, mat_right = self._split_svd_normal(matrix) elif svd_ctrl in ("E", "X"): mat_left, singvals_tot, mat_right = self._split_svd_eigvl( matrix, svd_ctrl, max_bond_dimension, contract_singvals, ) elif svd_ctrl in ("E+QR", "X+QR"): mat_left, singvals_tot, mat_right = self._split_svd_eigvl_qr( matrix, svd_ctrl, max_bond_dimension, contract_singvals, ) elif svd_ctrl == "R": mat_left, singvals_tot, mat_right = self._split_svd_random( matrix, max_bond_dimension ) if self.dtype == xp.float16: mat_left = mat_left.astype(xp.float16) mat_right = mat_right.astype(xp.float16) singvals_tot = singvals_tot.astype(xp.float16) # Truncation if not no_truncation: cut, singvals, singvals_cut = self._truncate_singvals( singvals_tot, conv_params ) if cut < mat_left.shape[1]: # Cutting bond dimension mat_left = mat_left[:, :cut] mat_right = mat_right[:cut, :] elif cut > mat_left.shape[1]: # Expanding bond dimension to comply with ideal hardware # settings dim = mat_left.shape[1] npad = ((0, 0), (0, cut - dim)) mat_left = xp.pad( mat_left, npad, mode="constant", constant_values=(0, 0) ) npad = ((0, cut - dim), (0, 0)) mat_right = xp.pad( mat_right, npad, mode="constant", constant_values=(0, 0) ) npad = (0, cut - dim) singvals = xp.pad( singvals, npad, mode="constant", constant_values=(0, 0) ) else: singvals = singvals_tot singvals_cut = [] # xp.array([], dtype=self.dtype) cut = len(singvals_tot) mat_left = mat_left[:, :cut] mat_right = mat_right[:cut, :] # Contract singular values if requested if svd_ctrl in ("D", "V", "R"): if contract_singvals.upper() == "L": mat_left = xp.multiply(mat_left, singvals) elif contract_singvals.upper() == "R": mat_right = xp.multiply(singvals, mat_right.T).T elif contract_singvals.upper() != "N": raise ValueError( f"Contract_singvals option {contract_singvals} is not " + "implemented. Choose between right (R), left (L) or None (N)." ) # Reshape back to tensors tens_left = mat_left.reshape(list(shape_left) + [cut]) if perm_left is not None: tens_left = tens_left.transpose(perm_left) tens_right = mat_right.reshape([cut] + list(shape_right)) if perm_right is not None: tens_right = tens_right.transpose(perm_right) # Convert into QteaTensor tens_left = self.from_elem_array( tens_left, dtype=self.dtype, device=self.device ) tens_right = self.from_elem_array( tens_right, dtype=self.dtype, device=self.device ) return tens_left, tens_right, singvals, singvals_cut
# no gpu_switch necessary, always called from within split_svd def _split_svd_normal(self, matrix): """ Normal SVD of the matrix. First try the faster gesdd iterative method. If it fails, resort to gesvd. Parameters ---------- matrix: xp.array Matrix to decompose Returns ------- xp.array Matrix U xp.array Singular values xp.array Matrix V^dagger """ xp = self._device_checks() try: mat_left, singvals_tot, mat_right = xp.linalg.svd( matrix, full_matrices=False ) except np.linalg.LinAlgError: logger.error("GESDD SVD decomposition failed. Resorting to gesvd.") mat_left, singvals_tot, mat_right = sp.linalg.svd( matrix, full_matrices=False, lapack_driver="gesvd" ) return mat_left, singvals_tot, mat_right # no gpu_switch necessary, always called from within split_svd def _split_svd_eigvl(self, matrix, svd_ctrl, max_bond_dimension, contract_singvals): """ SVD of the matrix through an eigvenvalue decomposition. Parameters ---------- matrix: xp.array Matrix to decompose svd_crtl : str If "E" normal eigenvalue decomposition. If "X" use the sparse. max_bond_dimension : int Maximum bond dimension contract_singvals: str Whhere to contract the singular values Returns ------- xp.array Matrix U xp.array Singular values xp.array Matrix V^dagger Details ------- We use ᵀ*=^†, the adjoint. - In the contract-to-right case, which means: H = AAᵀ* = USV Vᵀ*SUᵀ* = U S^2 Uᵀ* To compute SVᵀ* we have to use: A = USVᵀ* -> Uᵀ* A = S Vᵀ* - In the contract-to-left case, which means: H = Aᵀ*A = VSUᵀ* USVᵀ* = VS^2 Vᵀ* First, we are given V, but we want Vᵀ*. However, let's avoid double work. To compute US we have to use: A = USVᵀ* -> AV = US Vᵀ* = right.T.conj() (with the conjugation done in place) """ xp, xsla = self._device_checks(return_sla=True) # The left tensor is unitary if contract_singvals == "R": herm_mat = matrix @ matrix.conj().T # contract_singvals == "L", the right tensor is unitary else: herm_mat = matrix.conj().T @ matrix # We put the condition on the matrix being bigger than 2x2 # for the sparse eigensolver for stability of the arpack methods if svd_ctrl == "E" or (herm_mat.shape[0] - 1) <= 2: eigenvalues, eigenvectors = xp.linalg.eigh(herm_mat) elif svd_ctrl == "X": num_eigvl = min(herm_mat.shape[0] - 1, max_bond_dimension - 1) # Added in case bond dimension is 1 num_eigvl = max(num_eigvl, 1) eigenvalues, eigenvectors = xsla.eigsh(herm_mat, k=num_eigvl) else: raise ValueError( f"svd_ctrl = {svd_ctrl} not valid with eigenvalue decomposition" ) # Eigenvalues are sorted ascendingly, singular values descendengly # Only positive eigenvalues makes sense. Due to numerical precision, # there will be very small negative eigvl. We put them to 0. eigenvalues[eigenvalues < 0] = 0 singvals = xp.sqrt(eigenvalues[::-1][: min(matrix.shape)]) eigenvectors = eigenvectors[:, ::-1] # Taking only the meaningful part of the eigenvectors if contract_singvals == "R": left = eigenvectors[:, : min(matrix.shape)] right = left.T.conj() @ matrix else: right = eigenvectors[:, : min(matrix.shape)] left = matrix @ right right = right.T.conj() return left, singvals, right def _split_svd_eigvl_qr( self, matrix, svd_ctrl, max_bond_dimension, contract_singvals ): """ SVD of the matrix through an eigenvalue decomposition and a QR (plus some contractions). """ xp = self._device_checks() svd_ctrl_2 = svd_ctrl.replace("+QR", "") contract_singvals_2 = {"L": "R", "R": "L"}[contract_singvals] left, singvals, right = self._split_svd_eigvl( matrix, svd_ctrl_2, max_bond_dimension, contract_singvals_2 ) if contract_singvals == "L": # Iso center has to go to the left tensor right, r_mat = xp.linalg.qr(right.T) right = right.T left = xp.tensordot(left, r_mat, [[1], [0]]) else: # Iso center has to go to the right tensor left, r_mat = xp.linalg.qr(left) right = xp.tensordot(r_mat, right, [[1], [0]]) return left, singvals, right # no gpu_switch necessary, always called from within split_svd def _split_svd_random(self, matrix, max_bond_dimension): """ SVD of the matrix through a random SVD decomposition as prescribed in page 227 of Halko, Martinsson, Tropp's 2011 SIAM paper: "Finding structure with randomness: Probabilistic algorithms for constructing approximate matrix decompositions" Parameters ---------- matrix: xp.array Matrix to decompose max_bond_dimension : int Maximum bond dimension Returns ------- xp.array Matrix U xp.array Singular values xp.array Matrix V^dagger """ xp = self._device_checks() # pylint: disable-next=nested-min-max rank = min(max_bond_dimension, min(matrix.shape)) # This could be parameterized but in the paper they use this # value n_samples = 2 * rank random = xp.random.randn(matrix.shape[1], n_samples).astype(matrix.dtype) reduced_matrix = matrix @ random # Find orthonormal basis ortho, _ = xp.linalg.qr(reduced_matrix) # Second part to_svd = ortho.T @ matrix left_tilde, singvals, right = xp.linalg.svd(to_svd, full_matrices=False) left = ortho @ left_tilde return left, singvals, right
[docs] @gpu_switch def tensordot(self, other, contr_idx, disable_streams=False): """Tensor contraction of two tensors along the given indices.""" xp = self._device_checks() tmp_other_device = None if isinstance(other, _ArrayTypes): # numpy and cupy arrays are copied to device of 'self' as tensors other = QteaTensor.from_elem_array( other, dtype=self.dtype, device=self.device ) logger.warning("Converting tensor on the fly.") elif isinstance(other, QteaTensor): # move tensor 'other' to the device of 'self', if needed tmp_other_device = other.device other.convert(device=self.device) if other.device != tmp_other_device: logger.warning( "Switching tensor device on the fly. (%s -> %s)", tmp_other_device, other.device, ) elem = xp.tensordot(self._elem, other._elem, contr_idx) tens = QteaTensor.from_elem_array(elem, dtype=self.dtype, device=self.device) # move 'other' back to original device if tmp_other_device is not None: other.convert(device=tmp_other_device) return tens
[docs] @gpu_switch def stream(self, disable_streams=False): """ Define a stream for any operation Parameters ---------- disable_streams : bool, optional Allows to disable streams to avoid nested creation of streams. Globally, streams should be disabled via the `set_streams_qteatensors` function of the base tensor module. Default to False. Returns ------- Context manager, e.g., :class:`to.cuda.Stream` if on GPU :class:`nullcontext(AbstractContextManager)` otherwise Details ------- For multi-GPU applications, the device of the stream is selected as the device of the tensor at the time of the call. If one changes the device of the tensor `self` after creating the stream, the tensor's device and the stream's device will mismatch. """ if _USE_STREAMS and (not disable_streams) and self.is_gpu(): return cp.cuda.Stream() return nullcontext()
# -------------------------------------------------------------------------- # Internal methods # -------------------------------------------------------------------------- # # inherit _invert_link_selection # -------------------------------------------------------------------------- # MISC # -------------------------------------------------------------------------- # Needed for symmetric tensors, delete after you have changed qredtea
[docs] @gpu_switch def set_subtensor_entry(self, corner_low, corner_high, tensor): """ Set a subtensor (potentially expensive as looping explicitly, inplace update). **Arguments** corner_low : list of ints The lower index of each dimension of the tensor to set. Length must match rank of tensor `self`. corner_high : list of ints The higher index of each dimension of the tensor to set. Length must match rank of tensor `self`. tensor : :class:`QteaTensor` Tensor to be set as subtensor. Rank must match tensor `self`. Dimensions must match `corner_high - corner_low`. **Examples** To set the tensor of shape 2x2x2 in a larger tensor `self` of shape 8x8x8 the corresponing call is in comparison to a numpy syntax: * self.set_subtensor_entry([2, 4, 2], [4, 6, 4], tensor) * self[2:4, 4:6, 2:4] = tensor Or with variables and rank-3 tensors * self.set_subtensor_entry([a, b, c], [d, e, f], tensor) * self[a:d, b:e, c:f] = tensor This function is here because of the symmetric tensors. As a developer, ask yourself: Do you really need to use it? Consider using Numpy-like tensor slicing instead if you are sure to have always _AbstractQteaBaseTensors. """ logger_warning("Using deprecated `set_subtensor_entry`.") lists = [] for ii, corner_ii in enumerate(corner_low): if corner_high[ii] - corner_ii == 1: lists.append(corner_ii) else: lists.append(slice(corner_ii, corner_high[ii], 1)) self._elem[*lists] = tensor.elem
[docs] @staticmethod def get_default_datamover(): """The default datamover compatible with this class.""" return DataMoverNumpyCupy()
[docs] @gpu_switch def mask_to_device(self, mask): """ Send a mask to the device where the tensor is. (right now only CPU --> GPU, CPU --> CPU). """ if self.is_cpu(): return mask xp = self._device_checks() mask_on_device = xp.array(mask, dtype=bool) return mask_on_device
[docs] def mask_to_host(self, mask): """ Send a mask to the host. (right now only CPU --> GPU, CPU --> CPU). """ if self.is_cpu(): return mask return cp.asnumpy(mask)
# no gpu_switch necessary, always called from scale_link_update def _einsum_inplace(self, xp, *args): """Short-cut to inplace-einsum resolving numpy vs cupy. Sets self._elem.""" if xp == np: xp.einsum(*args, out=self._elem) else: self._elem = xp.einsum(*args) # no gpu_switch necessary, going towards CPU
[docs] def get(self): """Get the whole array of a tensor to the host as tensor.""" if hasattr(self._elem, "get"): return self.from_elem_array(self._elem.get()) return self
# no gpu_switch necessary, going towards CPU
[docs] def get_of(self, variable): """Run the get method to transfer to host on variable (same device as self).""" if hasattr(variable, "get"): return variable.get() return variable
# no gpu_switch necessary, no direct cupy calls def _shift_iso_to_qr(self, target_tens, source_link, target_link): """Method to shift isometry center between two tensors.""" nn = len(self.shape) lnk = source_link s_perm = list(range(lnk)) + list(range(lnk + 1, nn)) + [lnk] q_perm = list(range(lnk)) + [nn - 1] + list(range(lnk, nn - 1)) tmp = self.transpose(s_perm) dim = list(np.arange(tmp.ndim)) left_mat, right_mat = tmp.split_qr(dim[:-1], dim[-1:], perm_left=q_perm) tmp = target_tens.tensordot(right_mat, ([target_link], [1])) nn = len(target_tens.shape) lnk = target_link t_perm = list(range(lnk)) + [nn - 1] + list(range(lnk, nn - 1)) t_tens = tmp.transpose(t_perm) return left_mat, t_tens
[docs] @gpu_switch def eigvalsh(self): """Calculate eigendecomposition for a rank-2 tensor.""" xp = self._device_checks() if self.ndim != 2: raise QTeaLeavesError("Not a matrix, hence no eigvalsh possible.") eigvals = xp.linalg.eigvalsh(self._elem) return eigvals
[docs] def sqrtm(self): """Calculate matrix-square-root for a rank-2 tensor.""" if self.ndim != 2: raise QTeaLeavesError("Not a matrix, hence no sqrtm possible.") if self.is_gpu(): raise QTeaLeavesError("sqrtm not implemented on the GPU through cupy") elem = sp.linalg.sqrtm(self._elem) return self.from_elem_array(elem, dtype=self.dtype, device=self.device)
[docs] def expm(self, fuse_point=None, prefactor=1): """ Take the matrix exponential with a scalar prefactor, i.e., Exp(prefactor * self). Parameters ---------- fuse_point : int, optional If given, reshapes the tensor into a matrix by fusing links up to INCLUDING fuse_point into one, and links after into the second dimension. To compute the exponential of a 4-leg tensor, for example, by fusing (0,1),(2,3), set fuse_point=1. Default is None. prefactor : float, optional Prefactor of the tensor to be exponentiated. Default to 1. Return ------ mat : instance of :class:`QteaTensor` Exponential of input tensor. """ xp = self._device_checks() xla = sla if xp == np else cla mat = self.copy() original_shape = mat.shape # Fuse the links. if fuse_point is not None: mat.fuse_links_update( fuse_low=0, fuse_high=fuse_point, is_link_outgoing=False ) mat.fuse_links_update( fuse_low=1, fuse_high=mat.ndim - 1, is_link_outgoing=True ) # Take the exponent and reshape back into the original shape. # pylint: disable-next=protected-access mat._elem = xla.expm(prefactor * mat.elem) mat.reshape_update(original_shape) return mat
[docs] def eig(self): """ Compute eigenvalues and eigenvectors of a two-leg tensor Return ------ eigvals, eigvecs : instances of :class:`QteaTensor` Eigenvalues and corresponding eigenvectors of input tensor. """ if self.is_gpu(): # NOTE: 'cupyx.scipy.linalg' has no attribute 'eig' # so we handle this case as follows: device = self.device self.convert(device="cpu") eigvals, eigvecs = self.eig() self.convert(device=device) eigvals.convert(device=device) eigvecs.convert(device=device) return eigvals, eigvecs xp = self._device_checks() xla = sla if xp == np else cla if self.ndim != 2: raise QTeaLeavesError("Works only with two-leg tensor") eigvals, eigvecs = xla.eig( self.elem ) # NOTE: 'cupyx.scipy.linalg' has no attribute 'eig' eigvals = self.from_elem_array(eigvals, dtype=self.dtype, device=self.device) eigvecs = self.from_elem_array(eigvecs, dtype=self.dtype, device=self.device) return eigvals, eigvecs
[docs] @staticmethod def static_is_gpu_available(): """Returns flag if GPU is available for this tensor class.""" return GPU_AVAILABLE
[docs] def is_gpu_available(self): """Returns flag if GPU is available for this tensor class.""" return self.static_is_gpu_available()
[docs] @staticmethod def free_device_memory(device=None): """ Free the unused device memory that is otherwise occupied by the cache. Otherwise cupy will keep the memory occupied for caching reasons. For multi-GPU, all devices will be cleared unless you specify the device. Parameters ---------- device : str | None If present, i.e., not None, only the corresponding device will be freed. The device is a string, e.g., "gpu:0" """ if device is None: inds = ["gpu:%d" % (ii) for ii in range(NUM_GPUS)] elif QteaTensor.is_cpu_static(device): return None else: inds = [device] for device_ii in inds: # Use the context provided by the QteaTensor class # to catch case of current device with null context # pylint: disable-next=protected-access with QteaTensor._gpu_idx_context(device_ii): mempool = cp.get_default_memory_pool() pinned_mempool = cp.get_default_pinned_memory_pool() mempool.free_all_blocks() pinned_mempool.free_all_blocks() return None
# -------------------------------------------------------------------------- # Methods needed for _AbstractQteaBaseTensor # --------------------------------------------------------------------------
[docs] @gpu_switch def assert_diagonal(self, tol=1e-7): """Check that tensor is a diagonal matrix up to tolerance.""" xp = self._device_checks() if self.ndim != 2: raise QTeaLeavesError("Not a matrix, hence not the identity.") tmp = xp.diag(xp.diag(self._elem)) tmp -= self._elem if xp.abs(tmp).max() > tol: raise QTeaLeavesError("Matrix not diagonal.") return
[docs] @gpu_switch def assert_int_values(self, tol=1e-7): """Check that there are only integer values in the tensor.""" xp = self._device_checks() # .copy() was required as bugfix, although unclear why (dj) tmp = xp.round(self.copy().elem) tmp -= self._elem if xp.abs(tmp).max() > tol: raise QTeaLeavesError("Matrix is not an integer matrix.") return
[docs] @gpu_switch def assert_real_valued(self, tol=1e-7): """Check that all tensor entries are real-valued.""" xp = self._device_checks() tmp = xp.imag(self._elem) if xp.abs(tmp).max() > tol: raise QTeaLeavesError("Tensor is not real-valued.")
[docs] @gpu_switch def elementwise_abs_smaller_than(self, value): """Return boolean if each tensor element is smaller than `value`""" xp = self._device_checks() return (xp.abs(self._elem) < value).all()
[docs] @gpu_switch def flatten(self): """Returns flattened version (rank-1) of dense array in native array type.""" return self._elem.flatten()
# no gpu_switch necessary, will be handled inside convert
[docs] @classmethod def from_elem_array(cls, tensor, dtype=None, device=None): """ New tensor from array **Arguments** tensor : xp.ndarray Array for new tensor. dtype : data type, optional Can allow to specify data type. If not `None`, it will convert. Default to `None` """ if dtype is None and np.issubdtype(tensor.dtype, np.integer): logger_warning( "Initializing a tensor with integer dtype can be dangerous " "for the simulation. Please specify the dtype keyword in the " "from_elem_array method if it was not intentional." ) if cp is None: current_device = _CPU_DEVICE else: current_device = ( _CPU_DEVICE if cp.get_array_module(tensor) == np else _GPU_DEVICE ) if dtype is None: dtype = tensor.dtype if (device is None) and (cp is None): # cupy not available device = _CPU_DEVICE elif (device is None) and not GPU_AVAILABLE: # Well, cupy is there, but no GPU device = _CPU_DEVICE elif device is None: # We can actually check with cp where we are running device = current_device # __init__ has no convert, it will set the _device attribute # without checks, so it has to be true (instead, data type goes via # array) obj = cls(tensor.shape, ctrl=None, dtype=None, device=current_device) obj._elem = tensor obj.convert(dtype, device) return obj
[docs] def get_attr(self, *args): """High-risk resolve attribute for an operation on an elementary array.""" xp = self._device_checks() attributes = [] for elem in args: if elem in ["linalg.eigh", "eigh"]: attributes.append(xp.linalg.eigh) continue if not hasattr(xp, elem): raise QTeaLeavesError( f"This tensor's elementary array does not support {elem}." ) attributes.append(getattr(xp, elem)) if NUM_GPUS > 1: attributes = self._get_attr_with_gpu_switch(attributes) if len(attributes) == 1: return attributes[0] return tuple(attributes)
def _get_attr_with_gpu_switch(self, attributes): """ Wrap functions from get_attr into device of the current tensor. Arguments --------- attributes : list[callable] List of requested attribute functions. Returns ------- attributes : list[callable] List of requested attribute functions running on specific GPU of current tensor if needed. """ gpu_idx = self._gpu_idx(self.device) if gpu_idx is None: # Not on GPU, return attributes directly return attributes current = cp.cuda.runtime.getDevice() if current == gpu_idx: # Current device is target, return attributes directly return attributes attributes_on_device = [] for elem in attributes: def func_on_device(*args, func=elem, gpu_idx=gpu_idx, **kwargs): with cp.cuda.Device(gpu_idx): func(*args, **kwargs) attributes_on_device.append(func_on_device) return attributes_on_device
[docs] @gpu_switch def get_diag_entries_as_int(self): """Return diagonal entries of rank-2 tensor as integer on host.""" xp = self._device_checks() if self.ndim != 2: raise QTeaLeavesError("Not a matrix, cannot get diagonal.") tmp = xp.diag(self._elem) if self.is_gpu(): tmp = tmp.get() return xp.real(tmp).astype(int)
[docs] @gpu_switch def get_submatrix(self, row_range, col_range): """Extract a submatrix of a rank-2 tensor for the given rows / cols.""" if self.ndim != 2: raise QTeaLeavesError("Cannot only set submatrix for rank-2 tensors.") row1, row3 = row_range col1, col2 = col_range return self.from_elem_array( self._elem[row1:row3, col1:col2], dtype=self.dtype, device=self.device )
[docs] @gpu_switch def permute_rows_cols_update(self, inds): """Permute rows and columns of rank-2 tensor with `inds`. Inplace update.""" if self.ndim != 2: raise QTeaLeavesError("Can only permute rows & cols for rank-2 tensors.") tmp = self._elem[inds, :][:, inds] self._elem *= 0.0 self._elem += tmp return self
# no gpu_switch necessary, no actual calls with cupy
[docs] def prepare_eig_api(self, conv_params): """ Return xp variables for eigsh. **Returns** kwargs : dict Keyword arguments for eigs call. If initial guess can be passed, key "v0" is set with value `None` LinearOperator : callable Function generating a LinearOperator eigsh : callable Interface with actual call to eigsh """ xp, xsla = self._device_checks(return_sla=True) tolerance = conv_params.sim_params["arnoldi_tolerance"] if tolerance is None: tolerance = conv_params.sim_params["arnoldi_min_tolerance"] kwargs = { "k": 1, "which": "LA", "ncv": None, "maxiter": conv_params.arnoldi_maxiter, "tol": tolerance, "return_eigenvectors": True, "use_qtea_solver": False, } if self.is_cpu(): kwargs["v0"] = None if self.dtype == xp.float16: kwargs["use_qtea_solver"] = True if self.is_dtype_complex() and (np.prod(self.shape) == 2): # scipy eigsh switches for complex data types to eigs and # can only solve k eigenvectors of a nxn matrix with # k < n - 1. This leads to problems with 2x2 matrices # where one can get not even one eigenvector. kwargs["use_qtea_solver"] = True # abs is no attribute, only function kwargs["injected_funcs"] = {"abs": xp.abs} return kwargs, xsla.LinearOperator, xsla.eigsh
[docs] @gpu_switch def reshape(self, shape, **kwargs): """Reshape a tensor.""" elem = self._elem.reshape(shape, **kwargs) tens = QteaTensor.from_elem_array(elem, dtype=self.dtype, device=self.device) return tens
[docs] @gpu_switch def reshape_update(self, shape, **kwargs): """Reshape tensor dimensions inplace.""" self._elem = self._elem.reshape(shape)
[docs] @gpu_switch def set_submatrix(self, row_range, col_range, tensor): """Set a submatrix of a rank-2 tensor for the given rows / cols.""" if self.ndim != 2: raise QTeaLeavesError("Cannot only set submatrix for rank-2 tensors.") row1, row3 = row_range col1, col2 = col_range self._elem[row1:row3, col1:col2] = tensor.elem.reshape(row3 - row1, col2 - col1)
def _truncate_decide_chi( self, chi_now, chi_by_conv, chi_by_trunc, chi_min, ): """ Decide on the bond dimension based on the various values chi and potential hardware preference indicated. **Arguments** chi_now : int Current value of the bond dimension chi_by_conv : int Maximum bond dimension as suggested by convergence parameters. chi_by_trunc : int Bond dimension suggested by truncating (either ratio or norm). chi_min : int Minimum bond dimension under which we do not want to go below. For example, used in TTN algorithms. """ return self._truncate_decide_chi_static( chi_now, chi_by_conv, chi_by_trunc, chi_min, _BLOCK_SIZE_BOND_DIMENSION, _BLOCK_SIZE_BYTE, self.elem.itemsize, ) # no gpu_swicth necessary, only calls from split_qrte and split_svd def _truncate_singvals(self, singvals, conv_params=None): """ Truncate the singular values followling the strategy selected in the convergence parameters class Parameters ---------- singvals : np.ndarray Array of singular values conv_params : :py:class:`TNConvergenceParameters`, optional Convergence parameters to use in the procedure. If None is given, then use the default convergence parameters of the TN. Default to None. Returns ------- cut : int Number of singular values kept singvals_kept : np.ndarray Normalized singular values kept singvals_cutted : np.ndarray Normalized singular values cutted """ if conv_params is None: conv_params = TNConvergenceParameters() logger_warning("Using default convergence parameters.") elif not isinstance(conv_params, TNConvergenceParameters): raise ValueError( "conv_params must be TNConvergenceParameters or None, " + f"not {type(conv_params)}." ) if conv_params.trunc_method == "R": cut = self._truncate_sv_ratio(singvals, conv_params) elif conv_params.trunc_method == "N": cut = self._truncate_sv_norm(singvals, conv_params) else: raise QTeaLeavesError(f"Unkown trunc_method {conv_params.trunc_method}") # Divide singvals in kept and cut singvals_kept = singvals[:cut] singvals_cutted = singvals[cut:] # Renormalizing the singular values vector to its norm # before the truncation norm_kept = (singvals_kept**2).sum() norm_trunc = (singvals_cutted**2).sum() normalization_factor = np.sqrt(norm_kept) / np.sqrt(norm_kept + norm_trunc) singvals_kept /= normalization_factor # Renormalize cut singular values to track the norm loss singvals_cutted /= np.sqrt(norm_trunc + norm_kept) return cut, singvals_kept, singvals_cutted
[docs] @gpu_switch def vector_with_dim_like(self, dim, dtype=None): """Generate a vector in the native array of the base tensor.""" xp = self._device_checks() if dtype is None: dtype = self.dtype return xp.ndarray(dim, dtype=dtype)
# -------------------------------------------------------------------------- # Internal methods (not required by abstract class) # -------------------------------------------------------------------------- def _device_checks(self, return_sla=False): """ Check if all the arguments of the function where _device_checks is called are on the correct device, select the correct Parameters ---------- device : str Device where the computation should take place. If called inside an emulator it should be the emulator device return_sla : bool, optional If True, returns the handle to the sparse linear algebra. Either sp.sparse.linalg or cp.scipy.sparse.linalg. Default to False. Returns ------- module handle cp if the device is GPU np if the device is CPU """ if self.device is None: raise QTeaLeavesError("None is only valid device in conversion.") if self.is_cpu() or not GPU_AVAILABLE: xp = np xsla = ssla else: xp = cp xsla = csla if return_sla: return xp, xsla return xp
[docs] def expand_tensor(self, link, new_dim, ctrl="R"): """Expand tensor along given link and to new dimension.""" newdim = list(self.shape) newdim[link] = max(new_dim - newdim[link], 0) expansion = QteaTensor(newdim, ctrl=ctrl, dtype=self.dtype, device=self.device) return self.stack_link(expansion, link)
@staticmethod def _gpu_idx(device_str): """ Extract the GPU index based on a device string. Parameters ---------- device_str : str Extract GPU index from this string. Returns ------- gpu_idx : int | None Returns `None` if device string is no GPU. If device string is GPU, returns index or index of current device. """ if not QteaTensor.is_gpu_static(device_str): return None if ":" not in device_str: return cp.cuda.runtime.getDevice() return int(device_str.split(":")[1]) @staticmethod def _gpu_idx_context(device_str): """ Build a context for the i-th GPU based on a device string. Parameters ---------- device_str : str Extract GPU index from this string. Returns ------- context : :class:`cp.cuda.Device` | :class:`nullcontext` Nullcontext is returned if device string is no GPU or if the index of the GPU corresponds to the current device. If the GPU index does not correspond to the current device, the corresponding context manager is returned to work on this device. """ # pylint: disable-next=protected-access gpu_idx = QteaTensor._gpu_idx(device_str) if gpu_idx is None: return nullcontext() current = cp.cuda.runtime.getDevice() if current == gpu_idx: return nullcontext() return cp.cuda.Device(gpu_idx) # no gpu_switch necessary, only calls from split_qr def _split_qr_dim(self, rows, cols): """Split via QR knowing dimension of rows and columns.""" xp = self._device_checks() if self.dtype == xp.float16: matrix = self._elem.astype(xp.float32).reshape(rows, cols) qmat, rmat = xp.linalg.qr(matrix) qmat = qmat.astype(xp.float16) rmat = rmat.astype(xp.float16) else: qmat, rmat = xp.linalg.qr(self._elem.reshape(rows, cols)) qtens = QteaTensor.from_elem_array(qmat, dtype=self.dtype, device=self.device) rtens = QteaTensor.from_elem_array(rmat, dtype=self.dtype, device=self.device) return qtens, rtens # no gpu_switch necessary, only calls from split_rq def _split_rq_dim(self, rows, cols): """Split via RQ knowing dimension of rows and columns.""" xp = self._device_checks() assert xp == np if self.dtype == xp.float16: matrix = self._elem.astype(xp.float32).reshape(rows, cols) rmat, qmat = sla.rq(matrix, mode="economic", check_finite=False) rmat = rmat.astype(xp.float16) qmat = qmat.astype(xp.float16) else: rmat, qmat = sla.rq( self._elem.reshape(rows, cols), mode="economic", check_finite=False ) rtens = QteaTensor.from_elem_array(rmat, dtype=self.dtype, device=self.device) qtens = QteaTensor.from_elem_array(qmat, dtype=self.dtype, device=self.device) return rtens, qtens # no gpu_switch necessary, only calls from _truncate_singvals def _truncate_sv_ratio(self, singvals, conv_params): """ Truncate the singular values based on the ratio with the biggest one. Parameters ---------- singvals : np.ndarray Array of singular values conv_params : :py:class:`TNConvergenceParameters`, optional Convergence parameters to use in the procedure. Returns ------- cut : int Number of singular values kept """ xp = self._device_checks() # Truncation lambda1 = singvals[0] cut = xp.nonzero(singvals / lambda1 < conv_params.cut_ratio)[0] if self.is_gpu(): cut = cut.get() chi_now = len(singvals) chi_by_conv = conv_params.max_bond_dimension chi_by_ratio = cut[0] if len(cut) > 0 else chi_now chi_min = conv_params.min_bond_dimension return self._truncate_decide_chi(chi_now, chi_by_conv, chi_by_ratio, chi_min) # no gpu_switch necessary, only calls from _truncate_singvals def _truncate_sv_norm(self, singvals, conv_params): """ Truncate the singular values based on the total norm cut Parameters ---------- singvals : np.ndarray Array of singular values conv_params : :py:class:`TNConvergenceParameters`, optional Convergence parameters to use in the procedure. Returns ------- cut : int Number of singular values kept """ xp = self._device_checks() norm = (singvals[::-1] ** 2).cumsum() / (singvals**2).sum() # You get the first index where the constraint is broken, # so you need to stop an index before cut = xp.nonzero(norm > conv_params.cut_ratio)[0] if self.is_gpu(): cut = cut.get() chi_now = len(singvals) chi_by_conv = conv_params.max_bond_dimension chi_by_norm = len(singvals) - cut[0] if len(cut) > 0 else chi_now chi_min = conv_params.min_bond_dimension return self._truncate_decide_chi(chi_now, chi_by_conv, chi_by_norm, chi_min)
# pylint: disable-next=too-many-branches
[docs] def _process_svd_ctrl(svd_ctrl, max_bond_dim, shape, device, contract_singvals): """ Process the svd_ctrl parameter for an SVD decomposition Parameters ---------- svd_ctrl: str SVD identifier chosen by the user max_bond_dim : int Maximum bond dimension shape: Tuple[int] Shape of the matrix to be split device: str Device where the splitting is taking place contract_singvals: str Where to contract the singvals Return ------ str The svd_ctrl after the double-check """ # First, resolve selection by user if svd_ctrl in ("V", "D", "R"): return svd_ctrl # Previously ratio above 3 were considered good, bond dimension was not considered for X ratio_svd_ctrl_r = shape[1] / shape[0] * int(contract_singvals == "R") ratio_svd_ctrl_l = shape[0] / shape[1] * int(contract_singvals == "L") ratio_chis = min(shape[0], shape[1]) / max_bond_dim if svd_ctrl in ["E!", "X!"]: # Enforce eigenvalue decompositions if (contract_singvals == "L") and (ratio_svd_ctrl_l >= 1.0): # Good shape return svd_ctrl.replace("!", "") if contract_singvals == "L": # Bad shape and we need QR return svd_ctrl.replace("!", "+QR") if (contract_singvals == "R") and (ratio_svd_ctrl_r >= 1.0): # Good shape return svd_ctrl.replace("!", "") if contract_singvals == "R": # Bad shape and we need QR return svd_ctrl.replace("!", "+QR") # An eigenvalue decomposition is nice if we contract the singular values # to the right with shape[0] < shape[1] OR # contract singular values to the left with shape[1] < shape[0] good_svd_ctrl_e = (ratio_svd_ctrl_r >= 3.0) or (ratio_svd_ctrl_l >= 3.0) good_svd_ctrl_x = ( (ratio_svd_ctrl_r >= 3.0) or (ratio_svd_ctrl_l >= 3.0) and (ratio_chis >= 2.0) ) if svd_ctrl == "E" and good_svd_ctrl_e: return svd_ctrl if svd_ctrl == "X" and good_svd_ctrl_x: # The order might be worth discussing, X will never be used if E is # okay unless user chooses it. return svd_ctrl if svd_ctrl in ("E", "X", "E!", "X!"): # We could still attempt to calculate more eigen # values than we need singular values, which can lead # to instabilities. The user was asking for E or X, # so we ignored the default bounds # Let the autoselect to its job logger_warning("Ignoring user input for svd_ctrl.") svd_ctrl = "A" # Sparse problem, but with no singvals contracted on cpu, # use random svd decomposition if min(shape) >= 2 * max_bond_dim and QteaTensor.is_cpu_static(device): return "R" # If none of the above works, go with automatic selection # First, if we do not need to compute all the singvals, use # sparse eigenvalue decomposition if min(shape) >= 4 * max_bond_dim and good_svd_ctrl_x: return "X" # Non-sparse problem on GPU, with singvals contracted # to left or right if QteaTensor.is_gpu_static(device) and good_svd_ctrl_e: return "E" if good_svd_ctrl_e: # We should use eigendecomposition approach as well # on CPU return "E" # If everything else fails, we go for normal svd return "D"
[docs] class DataMoverNumpyCupy(_AbstractDataMover): """ Data mover to move QteaTensor between numpy and cupy. Details ------- Streams are linked to device as indicated by the following page: https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#stream-and-event-behavior and multi-GPU behavior: https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#multi-gpu """ tensor_cls = (QteaTensor,) def __init__(self): if GPU_AVAILABLE: self.mover_stream = [] for ii in range(NUM_GPUS): with cp.cuda.Device(ii): self.mover_stream.append(cp.cuda.Stream(non_blocking=True)) # cupy examples show that mempool depends on the current device # upon call time # https://docs.cupy.dev/en/stable/user_guide/memory.html#limiting-gpu-memory-usage self.mempool = cp.get_default_memory_pool() # Seems to be unused self.pinned_mempool = cp.get_default_pinned_memory_pool() else: self.mover_stream = None self.mempool = None self.pinned_mempool = None @property def device_memory(self): """Current memory occupied in the device.""" return self.mempool.used_bytes()
[docs] def sync_move(self, tensor, device): """ Move the tensor `tensor` to the device `device` synchronously with the main computational stream. Parameters ---------- tensor : _AbstractTensor The tensor to be moved device: str The device where to move the tensor """ if GPU_AVAILABLE: tensor.convert(dtype=None, device=device)
[docs] def async_move(self, tensor, device, stream=None): """ Move the tensor `tensor` to the device `device` asynchronously with respect to the main computational stream. Parameters ---------- tensor : _AbstractTensor The tensor to be moved device: str The device where to move the tensor stream : stream-object Stream to be used to move the data if a stream different from the data mover's stream should be used. Default to None (use DataMover's stream) """ # To find the GPU index, we have to know if source or target is on GPU if stream is not None: # Do not interfere with user suggestion pass elif tensor.is_gpu(): # Tensor is on GPU, take the corresponding stream # pylint: disable-next=protected-access gpu_idx = QteaTensor._gpu_idx(tensor.device) stream = self.mover_stream[gpu_idx] elif tensor.is_gpu(query=device): # Target should be GPU, take the corresponding stream # pylint: disable-next=protected-access gpu_idx = QteaTensor._gpu_idx(device) stream = self.mover_stream[gpu_idx] # else: # CPU --> CPU, but call anyway in case of new devices (stream is already None) tensor.convert(dtype=None, device=device, stream=stream)
[docs] def wait(self, device=None): """ Put a barrier for the streams and wait them. Parameters ---------- device : str | None, optional If `None`, all streams will be synchronized. Default to `None` """ if self.mover_stream is None: return if device is None: # Have to synchronize all of them for ii, stream in enumerate(self.mover_stream): # pylint: disable-next=protected-access with QteaTensor._gpu_idx_context(f"gpu:{ii}"): stream.synchronize() return # pylint: disable-next=protected-access gpu_idx = QteaTensor._gpu_idx(device) if gpu_idx is not None: # pylint: disable-next=protected-access with QteaTensor._gpu_idx_context(f"gpu:{gpu_idx}"): self.mover_stream[gpu_idx].synchronize()